package com.lagou.bak;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class WindowTest3 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> data = env.socketTextStream("hdp-1", 7777);
        SingleOutputStreamOperator<Tuple2<String, Integer>> maped = data.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String value) throws Exception {
                return new Tuple2<>(value, 10);
            }
        });

        KeyedStream<Tuple2<String, Integer>, String> keyed = maped.keyBy(value -> value.f0);
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> sessionWindow = keyed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));
        SingleOutputStreamOperator<String> applyed = sessionWindow.apply(new WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
                StringBuilder sb = new StringBuilder();
                for (Tuple2<String, Integer> t2 : input) {
                    String s1 = t2.f0 + t2.f1;
                    sb.append(s1);
                }
                out.collect(sb.toString());
            }
        });

        applyed.print();
        env.execute();

    }
}



















